﻿using Microsoft.Extensions.Logging;
using Polly;
using Polly.Retry;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using System;
using System.IO;
using System.Net.Sockets;


namespace EventBus.RabbitMQ
{
    public class DefaultRabbitMQPersistentConnection : IRabbitMQPersistentConnection
    {
        #region fields & properties

        private readonly IConnectionFactory _connectionFactory;
        private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger;
        private readonly int RETRY_COUNT = 5;
        IConnection _connection;
        bool _disposed;

        object sync_root = new object();

        /// <summary>
        /// 标识连接是否可用
        /// </summary>
        public bool IsConnected
        {
            get { return _connection != null && _connection.IsOpen && !_disposed; }
        }

        #endregion

        #region ctors

        public DefaultRabbitMQPersistentConnection(ILogger<DefaultRabbitMQPersistentConnection> logger)
        {
            _connectionFactory = RabbitMQConnectionProvider.CreateFactory();
            _logger = logger;
        }

        #endregion

        #region methods

        /// <summary>
        /// 创建AMQP对象
        /// </summary>
        /// <returns></returns>
        public IModel CreateModel()
        {
            if (!IsConnected)
            {
                throw new InvalidOperationException("无可用的RabbitMQ连接");
            }
            return _connection.CreateModel();
        }

        /// <summary>
        /// 释放连接资源
        /// </summary>
        public void Dispose()
        {
            if (_disposed) return;

            _disposed = true;
            try
            {
                _connection.Dispose();
            }
            catch (IOException ex)
            {
                _logger.LogError(ex.Message, ex);
            }
        }

        /// <summary>
        /// 连接RabbitMQ
        /// </summary>
        /// <returns></returns>
        public bool TryConnect()
        {
            _logger.LogInformation("开始尝试连接RabbitMQ");
            lock (sync_root)
            {
                var policy = RetryPolicy.Handle<SocketException>()
                    .Or<BrokerUnreachableException>()
                    .WaitAndRetry(RETRY_COUNT, retryAttemp => TimeSpan.FromSeconds(Math.Pow(2, retryAttemp)), (ex, time) =>
                    {
                        _logger.LogWarning($"({time.TotalSeconds:n1}s后)无法连接RabbitMQ:{ex.Message}", ex);
                    });

                policy.Execute(() =>
                {
                    _connection = _connectionFactory.CreateConnection();
                });

                if (IsConnected)
                {
                    _connection.ConnectionShutdown += OnConnectionShutdown;
                    _connection.CallbackException += OnCallbackException;
                    _connection.ConnectionBlocked += OnConnectionBlocked;

                    _logger.LogInformation($"RabbitMQ客户端已成功连接到 {_connection.Endpoint.HostName}，并且已订阅相关失败事件");
                    return true;
                }
                else
                {
                    _logger.LogCritical("FATAL ERROR：无法创建并打开RabbitMQ连接！");
                    return false;
                }
            }
        }

        /// <summary>
        /// 连接被关闭
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void OnConnectionBlocked(object sender, global::RabbitMQ.Client.Events.ConnectionBlockedEventArgs e)
        {
            if (_disposed) return;

            _logger.LogWarning("RabbitMQ连接被关闭，正在尝试重连...");

            TryConnect();
        }

        /// <summary>
        /// 连接时发生异常
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void OnCallbackException(object sender, global::RabbitMQ.Client.Events.CallbackExceptionEventArgs e)
        {
            if (_disposed) return;

            _logger.LogWarning("连接RabbitMQ时发生异常，正在尝试重连...");

            TryConnect();
        }

        /// <summary>
        /// 连接已经闭关
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void OnConnectionShutdown(object sender, ShutdownEventArgs e)
        {
            if (_disposed) return;

            _logger.LogWarning("RabbitMQ连接处于关闭状态，正在尝试重连...");

            TryConnect();
        }

        #endregion
    }
}
